# Licensed under the MIT License
# Modified from https://github.com/lebr0nli/GEP
# Copyright (c) 2022 Alan Li
# Copyright (c) 2025 Zhi-Qiang Zhou

from __future__ import annotations

import atexit
import functools
import os
import re
import shutil
import sys
import tempfile
import threading
from subprocess import PIPE
from subprocess import Popen
from typing import Callable
from typing import Iterator
from typing import ParamSpec
from typing import TypeVar

import lldb
from prompt_toolkit import ANSI
from prompt_toolkit import PromptSession
from prompt_toolkit.application import run_in_terminal
from prompt_toolkit.completion import CompleteEvent
from prompt_toolkit.completion import Completer
from prompt_toolkit.completion import Completion
from prompt_toolkit.document import Document
from prompt_toolkit.history import FileHistory
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.key_binding import KeyPressEvent
from prompt_toolkit.output import create_output

from pwndbg.dbg.lldb import LLDB

# global variables
P = ParamSpec("P")
T = TypeVar("T")

PROMPT = ANSI("\x1b[34mpwndbg-lldb>\x1b[0m ")
HISTORY_FILE = os.path.expanduser("~/.pwndbg_history")

FZF_RUN_CMD = (
    "fzf",
    "--select-1",
    "--exit-0",
    "--tiebreak=index",
    "--no-multi",
    "--height=40%",
    "--layout=reverse",
)

FZF_PRVIEW_WINDOW_ARGS = (
    "--preview-window",
    "right:55%:wrap",
)


def get_lldb_completes(dbg: LLDB, query: str = "") -> list[str]:
    interp: lldb.SBCommandInterpreter = dbg.debugger.GetCommandInterpreter()
    matches = lldb.SBStringList()
    _num_matches = interp.HandleCompletion(query, len(query), 0, -1, matches)
    proposals = [matches.GetStringAtIndex(i) for i in range(1, matches.GetSize())]
    return proposals


def safe_get_help_docs(dbg: LLDB, command: str) -> str | None:
    interp: lldb.SBCommandInterpreter = dbg.debugger.GetCommandInterpreter()
    add_to_history = False
    result = lldb.SBCommandReturnObject()
    interp.HandleCommand(f"help {command}", result, add_to_history)
    return result.GetOutput()


def should_get_help_docs(dbg: LLDB, completion: str) -> bool:
    """
    Check if we need to get help docs for another completion that generated by same command.
    """
    if " " not in completion.strip():
        return True
    parent_command, _ = completion.rsplit(maxsplit=1)
    return safe_get_help_docs(dbg, parent_command) != safe_get_help_docs(dbg, completion)


def get_lldb_completion_and_status(dbg: LLDB, query: str) -> tuple[list[str], bool]:
    """
    Return all possible completions and whether we need to get help docs for all completions.
    """
    all_completions = get_lldb_completes(dbg, query)
    # peek the first completion
    should_get_all_help_docs = False
    if all_completions:
        should_get_all_help_docs = should_get_help_docs(dbg, all_completions[0])
    return all_completions, should_get_all_help_docs


def create_fzf_process(query: str, preview: str = "", pre_cmd: str = "") -> Popen[str]:
    """
    Create a fzf process with given query and preview command.
    """
    if query.startswith("!"):
        # ! in the beginning of query means we want to run the command directly for fzf
        query = "^" + query
    cmd: tuple[str, ...] = (
        FZF_RUN_CMD + ("--query", query) + ("--prompt", "> \x1b[35m" + pre_cmd + "\x1b[0m")
    )
    if preview:
        cmd += FZF_PRVIEW_WINDOW_ARGS
        cmd += ("--preview", preview)
    return Popen(cmd, stdin=PIPE, stdout=PIPE, text=True)


def create_preview_fifos() -> tuple[str, str]:
    """
    Create a temporary directory and two FIFOs in it, return the paths of these FIFOs.

    This is modified from:
    https://github.com/infokiller/config-public/blob/652b4638a0a0ffed9743fa9e0ad2a8d4e4e90572/.config/ipython/profile_default/startup/ext/fzf_history.py#L128
    """
    fifo_dir = tempfile.mkdtemp(prefix="pwndbg_lldb_tab_fzf_")
    fifo_input_path = os.path.join(fifo_dir, "input")
    fifo_output_path = os.path.join(fifo_dir, "output")
    os.mkfifo(fifo_input_path)
    os.mkfifo(fifo_output_path)
    atexit.register(shutil.rmtree, fifo_dir)
    return fifo_input_path, fifo_output_path


def fzf_reverse_search(event: KeyPressEvent) -> None:
    """Reverse search history with fzf."""

    def _fzf_reverse_search() -> None:
        if not os.path.exists(HISTORY_FILE):
            # just create an empty file
            with open(HISTORY_FILE, "w"):
                pass
        p = create_fzf_process(event.app.current_buffer.document.text_before_cursor)
        with open(HISTORY_FILE) as f:
            visited = set()
            # Reverse the history, and only keep the youngest and unique one
            for line in f.read().strip().split("\n")[::-1]:
                if line and line not in visited:
                    visited.add(line)
                    p.stdin.write(line + "\n")
        stdout, _ = p.communicate()
        if stdout:
            event.app.current_buffer.document = Document()  # clear buffer
            event.app.current_buffer.insert_text(stdout.strip())

    run_in_terminal(_fzf_reverse_search)


def fzf_tab_autocomplete(
    event: KeyPressEvent, dbg: LLDB, preview: str, fifo_in: str, fifo_out: str
) -> None:
    """
    Tab autocomplete with fzf.
    """

    def _fzf_tab_autocomplete() -> None:
        target_text = (
            event.app.current_buffer.document.text_before_cursor.lstrip()
        )  # Ignore leading whitespaces
        if " " in target_text:
            pre_cmd = " ".join([i for i in target_text.split(" ")[:-1] if i != " "]) + " "
        else:
            pre_cmd = ""

        all_completions, should_get_all_help_docs = get_lldb_completion_and_status(dbg, pre_cmd)
        if not all_completions:
            return

        query = target_text[len(pre_cmd) :]
        p = create_fzf_process(
            query, preview=preview if should_get_all_help_docs else None, pre_cmd=pre_cmd
        )
        completion_help_docs = {}
        for i, completion in enumerate(all_completions):
            p.stdin.write(completion + "\n")
            if should_get_all_help_docs:
                completion_help_docs[i] = safe_get_help_docs(dbg, pre_cmd + completion)
        t = FzfTabCompletePreviewThread(fifo_in, fifo_out, completion_help_docs)
        t.start()
        stdout, _ = p.communicate()
        t.stop()
        if stdout:
            event.app.current_buffer.delete_before_cursor(len(query))
            event.app.current_buffer.insert_text(stdout.rstrip() + " ")

    run_in_terminal(_fzf_tab_autocomplete)


class FzfTabCompletePreviewThread(threading.Thread):
    """
    A thread for previewing help docs of selected completion with fzf.

    This is modified from:
    https://github.com/infokiller/config-public/blob/master/.config/ipython/profile_default/startup/ext/fzf_history.py#L72
    """

    def __init__(
        self,
        fifo_input_path: str,
        fifo_output_path: str,
        completion_help_docs: dict[int, str],
        **kwargs,
    ) -> None:
        super().__init__(**kwargs)
        self.fifo_input_path = fifo_input_path
        self.fifo_output_path = fifo_output_path
        self.completion_help_docs = completion_help_docs
        self.is_done = threading.Event()

    def run(self) -> None:
        while not self.is_done.is_set():
            with open(self.fifo_input_path, encoding="utf-8") as fifo_input:
                while not self.is_done.is_set():
                    data = fifo_input.read()
                    if len(data) == 0:
                        break
                    with open(self.fifo_output_path, "w", encoding="utf-8") as fifo_output:
                        try:
                            idx = int(data)
                        except ValueError:
                            continue
                        help_doc = self.completion_help_docs.get(idx)
                        if help_doc is not None:
                            fifo_output.write(help_doc)

    def stop(self) -> None:
        self.is_done.set()
        with open(self.fifo_input_path, "w", encoding="utf-8") as f:
            f.close()
        self.join()


class LLDBHistory(FileHistory):
    """
    Manage your LLDB History
    """

    def __init__(self, filename: str, ignore_duplicates: bool = False) -> None:
        self.ignore_duplicates = ignore_duplicates
        super().__init__(filename=filename)

    def load_history_strings(self) -> list[str]:
        strings = []
        if os.path.exists(self.filename):
            with open(self.filename) as f:
                for string in reversed(f.read().splitlines()):
                    if self.ignore_duplicates and string in strings:
                        continue
                    if string:
                        strings.append(string)
        return strings

    def store_string(self, string: str) -> None:
        with open(self.filename, "a") as f:
            f.write(string.strip() + "\n")


class LLDBCompleter(Completer):
    """
    Completer of LLDB
    """

    def __init__(self, dbg):
        super().__init__()
        self.dbg = dbg

    def get_completions(
        self, document: Document, complete_event: CompleteEvent
    ) -> Iterator[Completion]:
        target_text = document.text_before_cursor.lstrip()  # Ignore leading whitespaces

        cursor_idx_in_completion = len(target_text)
        all_completions, should_get_all_help_docs = get_lldb_completion_and_status(
            self.dbg, target_text
        )
        if not all_completions:
            return

        for completion in all_completions:
            if not completion.startswith(target_text):
                continue
            display_meta = (
                None
                if not should_get_all_help_docs
                else safe_get_help_docs(self.dbg, completion) or None
            )
            # remove some prefix of raw completion
            completion = completion[cursor_idx_in_completion:]
            # display readable completion based on the text before cursor
            display = re.split(r"\W+", target_text)[-1] + completion
            yield Completion(completion, display=display, display_meta=display_meta)


def wrap_with_history(function: Callable[P, T]) -> Callable[P, T]:
    @functools.wraps(function)
    def _wrapped(*a: P.args, **kw: P.kwargs) -> T:
        return function(*a, **kw)

    return _wrapped


def get_prompt_session(dbg):
    bindings = KeyBindings()

    # key binding for fzf history search
    bindings.add("c-r")(fzf_reverse_search)
    # key binding for fzf tab completion
    fifo_in, fifo_out = create_preview_fifos()
    preview = "echo {n} > %s\ncat %s" % (fifo_in, fifo_out)
    bindings.add("c-i")(
        functools.partial(
            fzf_tab_autocomplete,
            dbg=dbg,
            preview=preview,
            fifo_in=fifo_in,
            fifo_out=fifo_out,
        )
    )

    return PromptSession(
        history=LLDBHistory(HISTORY_FILE, ignore_duplicates=True),
        completer=LLDBCompleter(dbg),
        complete_while_typing=False,
        key_bindings=bindings,
        output=create_output(stdout=sys.__stdout__),
    )
